{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Evaluation 12 - EMPIR\n",
    "This notebook implements the evaluation of [Tramer, Carlini, Brendel and Madry (2020)](https://arxiv.org/abs/2002.08347) using ART and focuses on section 12 evaluating \"EMPIR: Ensembles of Mixed Precision Deep Networks for Increased Robustness against Adversarial Attacks\".\n",
    "\n",
    "This notebook uses code from [Sen et al. (2020)](https://openreview.net/forum?id=HJem3yHKwH) at : https://github.com/sancharisen/EMPIR\n",
    "\n",
    "Before running this notebook you need to download the CIFAR-10 EMPIR models from https://github.com/sancharisen/EMPIR. into the local directory containing this notebook and save the 3 models into directories named `./CIFARconv/Model1`, `./CIFARconv/Model2`, and `./CIFARconv/Model3`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Using TensorFlow backend.\n"
     ]
    }
   ],
   "source": [
    "import os\n",
    "os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'\n",
    "import sys\n",
    "import warnings\n",
    "warnings.filterwarnings('ignore')\n",
    "\n",
    "import tensorflow as tf\n",
    "tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)\n",
    "import keras\n",
    "from keras.datasets import cifar10\n",
    "from keras.utils import np_utils\n",
    "import numpy as np\n",
    "\n",
    "from art.estimators.classification import TensorFlowClassifier\n",
    "from art.attacks.evasion import ProjectedGradientDescent"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "if ! [[ -d \"./EMPIR\" ]]\n",
    "then\n",
    "    git clone git@github.com:sancharisen/EMPIR.git\n",
    "fi\n",
    "touch ./EMPIR/__init__.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "sys.path.append(\"./\")\n",
    "sys.path.append(\"./EMPIR\")\n",
    "from EMPIR.cleverhans.utils_tf import model_eval_ensemble"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "sess = tf.Session()\n",
    "keras.backend.set_session(sess)\n",
    "tf.set_random_seed(1234)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# CIFAR10-specific dimensions\n",
    "img_rows = 32\n",
    "img_cols = 32\n",
    "channels = 3\n",
    "nb_classes = 10\n",
    "\n",
    "# Model specifications\n",
    "nb_filters = 32\n",
    "batch_size = 128\n",
    "nb_samples = 10000\n",
    "\n",
    "abits=2\n",
    "wbits=4\n",
    "\n",
    "abits2=2\n",
    "wbits2=2\n",
    "\n",
    "model_path1 = './CIFARconv/Model1'\n",
    "model_path2 = './CIFARconv/Model2'\n",
    "model_path3 = './CIFARconv/Model3'\n",
    "\n",
    "# Scaling input to softmax\n",
    "INIT_T = 1.0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "X_train shape: (50000, 32, 32, 3)\n",
      "50000 train samples\n",
      "10000 test samples\n"
     ]
    }
   ],
   "source": [
    "def data_cifar10():\n",
    "    \"\"\"\n",
    "    Preprocess CIFAR10 dataset\n",
    "    :return:\n",
    "    \"\"\"\n",
    "\n",
    "    # These values are specific to CIFAR10\n",
    "    img_rows = 32\n",
    "    img_cols = 32\n",
    "    nb_classes = 10\n",
    "\n",
    "    # the data, shuffled and split between train and test sets\n",
    "    (X_train, y_train), (X_test, y_test) = cifar10.load_data()\n",
    "\n",
    "    X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 3)\n",
    "    X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 3)\n",
    "\n",
    "    X_train = X_train.astype('float32')\n",
    "    X_test = X_test.astype('float32')\n",
    "    \n",
    "    X_train /= 255\n",
    "    X_test /= 255\n",
    "\n",
    "    print('X_train shape:', X_train.shape)\n",
    "    print(X_train.shape[0], 'train samples')\n",
    "    print(X_test.shape[0], 'test samples')\n",
    "\n",
    "    # convert class vectors to binary class matrices\n",
    "    Y_train = np_utils.to_categorical(y_train, nb_classes)\n",
    "    Y_test = np_utils.to_categorical(y_test, nb_classes)\n",
    "    return X_train, Y_train, X_test, Y_test\n",
    "\n",
    "# Get CIFAR10 test data\n",
    "X_train, Y_train, X_test, Y_test = data_cifar10()\n",
    "\n",
    "assert Y_train.shape[1] == 10.\n",
    "label_smooth = .1\n",
    "Y_train = Y_train.clip(label_smooth / 9., 1. - label_smooth)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create placeholders\n",
    "x = tf.placeholder(tf.float32, shape=(None, img_rows, img_cols, channels))\n",
    "y = tf.placeholder(tf.float32, shape=(None, 10))\n",
    "phase = tf.placeholder(tf.bool, name=\"phase\")\n",
    "logits_scalar = tf.placeholder_with_default(INIT_T, shape=(), name=\"logits_temperature\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture\n",
    "from EMPIR.cleverhans_tutorials.tutorial_models import make_ensemble_three_cifar_cnn\n",
    "model = make_ensemble_three_cifar_cnn(phase, logits_scalar, 'lp1_', 'lp2_', 'fp_', wbits, abits, wbits2,\n",
    "                                      abits2, input_shape=(None, img_rows, img_cols, channels),\n",
    "                                      nb_filters=nb_filters) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "%%capture\n",
    "preds_index = model.ensemble_call(x, reuse=False)\n",
    "preds_one_hot = tf.one_hot(preds_index, depth=nb_classes, on_value=None, off_value=None, axis=None,\n",
    "                           dtype=None, name=None)\n",
    "preds_prob = model.get_probs(x)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Test accuracy on legitimate test examples: 0.7256\n"
     ]
    }
   ],
   "source": [
    "variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)\n",
    "stored_variables = ['lp_conv1_init/k', 'lp_conv2_init/k', 'lp_conv3_init/k', 'lp_ip1init/W',\n",
    "                    'lp_logits_init/W']\n",
    "variable_dict = dict(zip(stored_variables, variables[:5]))\n",
    "\n",
    "# Restore the first set of variables from model_path1\n",
    "saver = tf.train.Saver(variable_dict)\n",
    "saver.restore(sess, tf.train.latest_checkpoint(model_path1))\n",
    "\n",
    "# Restore the second set of variables from model_path2\n",
    "variable_dict = dict(zip(stored_variables, variables[5:10]))\n",
    "saver2 = tf.train.Saver(variable_dict)\n",
    "saver2.restore(sess, tf.train.latest_checkpoint(model_path2))\n",
    "stored_variables = ['fp_conv1_init/k', 'fp_conv2_init/k', 'fp_conv3_init/k', 'fp_ip1init/W',\n",
    "                    'fp_logits_init/W']\n",
    "variable_dict = dict(zip(stored_variables, variables[10:]))\n",
    "saver3 = tf.train.Saver(variable_dict)\n",
    "saver3.restore(sess, tf.train.latest_checkpoint(model_path3))\n",
    "\n",
    "# Evaluate the accuracy of the CIFAR10 model on legitimate test examples\n",
    "eval_params = {'batch_size': batch_size}\n",
    "accuracy = model_eval_ensemble(sess, x, y, preds_index, X_test, Y_test, phase=phase, args=eval_params)\n",
    "print('Test accuracy on legitimate test examples: {0}'.format(accuracy))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_accuracy(X, Y, batch_size, predictions):\n",
    "    \n",
    "    sum_correct = 0\n",
    "    sum_samples = 0\n",
    "\n",
    "    with sess.as_default():\n",
    "\n",
    "        nb_batches = int(X.shape[0] / batch_size)\n",
    "\n",
    "        for i_batch in range(nb_batches):\n",
    "        \n",
    "            i_start = i_batch * batch_size\n",
    "            i_end = i_start + batch_size\n",
    "        \n",
    "            if i_end <= X.shape[0]:\n",
    "        \n",
    "                feed_dict = {x: X[i_start:i_end],\n",
    "                             phase: False}\n",
    "\n",
    "                y_pred = sess.run(predictions, feed_dict=feed_dict)\n",
    "                \n",
    "                sum_correct += np.sum(np.argmax(Y[i_start:i_end], axis=1) == np.argmax(y_pred, axis=1))\n",
    "                sum_samples += batch_size\n",
    "\n",
    "    accuracy = sum_correct / sum_samples\n",
    "    \n",
    "    return accuracy"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The accuracy on benign test samples: 72.57%\n"
     ]
    }
   ],
   "source": [
    "accuracy_test_benign = get_accuracy(X=X_test, Y=Y_test, batch_size=batch_size, predictions=preds_one_hot)\n",
    "print('The accuracy on benign test samples: {0:.2f}%'.format(accuracy_test_benign *100))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This is in agreement with the Unperturbed Accuracy of 72.56% reported by Sen et al. (2020)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# EMPIR Evaluation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(y_true=y, y_pred=preds_prob, from_logits=False,\n",
    "                                                               label_smoothing=0))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "feed_dict = {phase: False}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "classifier_empir = TensorFlowClassifier(input_ph=x,\n",
    "                                        output=preds_prob,\n",
    "                                        labels_ph=y,\n",
    "                                        train=None,\n",
    "                                        loss=loss,\n",
    "                                        learning=phase,\n",
    "                                        sess=sess,\n",
    "                                        channels_first=False,\n",
    "                                        clip_values=(0, 1),\n",
    "                                        preprocessing=(0, 1),\n",
    "                                        feed_dict=feed_dict)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "attack_empir = ProjectedGradientDescent(classifier=classifier_empir,\n",
    "                                        norm=np.inf,\n",
    "                                        eps=0.1,\n",
    "                                        eps_step=0.01,\n",
    "                                        max_iter=40,\n",
    "                                        targeted=False,\n",
    "                                        num_random_init=1,\n",
    "                                        batch_size=batch_size)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "X_test_adv = attack_empir.generate(X_test[:nb_samples], Y_test[:nb_samples])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Accuracy on adversarial test examples created by ART using EMPIR's loss: 11.57%\n"
     ]
    }
   ],
   "source": [
    "accuracy_test_adv = get_accuracy(X=X_test_adv, Y=Y_test, batch_size=batch_size, predictions=preds_one_hot)\n",
    "print('Accuracy on adversarial test examples created by ART using EMPIR\\'s loss: '\n",
    "      '{0:.2f}%'.format(accuracy_test_adv * 100))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Accuracy on adversarial test examples created by ART using EMPIR's loss is in agreement with the accuracy of 13.55% reported by Sen et al. (2020)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 12.3 Final Robustness Evaluation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture\n",
    "feed_dict = {x: X_test[0:12], phase: False}\n",
    "\n",
    "x_1 = x\n",
    "for layer in model.layers1:\n",
    "    x_1 = layer.fprop(x_1, reuse=False)\n",
    "    assert x_1 is not None\n",
    "preds_prob_1 = x_1\n",
    "    \n",
    "x_2 = x\n",
    "for layer in model.layers2:\n",
    "    x_2 = layer.fprop(x_2, reuse=False)\n",
    "    assert x_2 is not None\n",
    "preds_prob_2 = x_2\n",
    "    \n",
    "x_3 = x\n",
    "for layer in model.layers3:\n",
    "    x_3 = layer.fprop(x_3, reuse=False)\n",
    "    assert x_3 is not None\n",
    "preds_prob_3 = x_3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture\n",
    "preds_prob_new = (preds_prob_1 + preds_prob_2 + preds_prob_3) / 3\n",
    "loss_new = tf.keras.losses.categorical_crossentropy(y_true=y, y_pred=preds_prob_new, from_logits=False,\n",
    "                                                    label_smoothing=0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "feed_dict = {phase: False}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "classifier_eval = TensorFlowClassifier(input_ph=x,\n",
    "                                       output=preds_prob_new,\n",
    "                                       labels_ph=y,\n",
    "                                       train=None,\n",
    "                                       loss=loss_new,\n",
    "                                       learning=phase,\n",
    "                                       sess=sess,\n",
    "                                       channels_first=False,\n",
    "                                       clip_values=(0, 1),\n",
    "                                       preprocessing=(0, 1),\n",
    "                                       feed_dict=feed_dict)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "attack_eval = ProjectedGradientDescent(classifier=classifier_eval,\n",
    "                                       norm=np.inf,\n",
    "                                       eps=0.031,\n",
    "                                       eps_step=0.0078,\n",
    "                                       max_iter=100,\n",
    "                                       targeted=False,\n",
    "                                       num_random_init=1,\n",
    "                                       batch_size=batch_size)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "X_test_adv_final = attack_eval.generate(X_test[:nb_samples], Y_test[:nb_samples])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Accuracy on adversarial test examples created by ART using the loss by Tramer et al. (2020): 1.41%.\n"
     ]
    }
   ],
   "source": [
    "accuracy_test_adv_final = get_accuracy(X=X_test_adv_final, Y=Y_test, batch_size=batch_size,\n",
    "                                       predictions=preds_one_hot)\n",
    "print('Accuracy on adversarial test examples created by ART using the loss by Tramer et al. (2020): '\n",
    "      '{0:.2f}%.'.format(accuracy_test_adv_final * 100))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This is in agreement with the accuracy of 1.5% reported by Tramer et al. (2020)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Exercises for the reader (from Tramèr et al.)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "1. We only construct a very simple loss function that reduces model accuracy to 1.5%. Can a stronger (probably consistent) loss function reduce the accuracy to 0%? At ε = 4/255?\n",
    "2. Try to attack each of the models $f_{i}$ individually. Are they all similarly robust, or is one harder to attack than the others? (If the robustness of the models differ, how might a more consistent loss function look to attack the full defense)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model 1 - Accuracy on benign test samples: 64.55%.\n",
      "Model 2 - Accuracy on benign test samples: 61.80%.\n",
      "Model 3 - Accuracy on benign test samples: 74.54%.\n"
     ]
    }
   ],
   "source": [
    "# Get accuracy on benign test samples for each model separately\n",
    "\n",
    "accuracy_test_benign_1 = get_accuracy(X=X_test, Y=Y_test, batch_size=batch_size, predictions=preds_prob_1)\n",
    "print('Model 1 - Accuracy on benign test samples: {0:.2f}%.'.format(accuracy_test_benign_1 * 100))\n",
    "accuracy_test_benign_2 = get_accuracy(X=X_test, Y=Y_test, batch_size=batch_size, predictions=preds_prob_2)\n",
    "print('Model 2 - Accuracy on benign test samples: {0:.2f}%.'.format(accuracy_test_benign_2 * 100))\n",
    "accuracy_test_benign_3 = get_accuracy(X=X_test, Y=Y_test, batch_size=batch_size, predictions=preds_prob_3)\n",
    "print('Model 3 - Accuracy on benign test samples: {0:.2f}%.'.format(accuracy_test_benign_3 * 100))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model 1 - Accuracy on adversarial test examples: 0.98%.\n",
      "Model 2 - Accuracy on adversarial test examples: 1.06%.\n",
      "Model 3 - Accuracy on adversarial test examples: 0.02%.\n"
     ]
    }
   ],
   "source": [
    "# Get accuracy on adversarial test examples for each model separately\n",
    "\n",
    "for i_pred, preds_prob_i in enumerate([preds_prob_1, preds_prob_2, preds_prob_3]):\n",
    "    \n",
    "    loss_i = tf.keras.losses.categorical_crossentropy(y_true=y, y_pred=preds_prob_i, from_logits=False,\n",
    "                                                      label_smoothing=0)\n",
    "\n",
    "    classifier_eval_i = TensorFlowClassifier(input_ph=x,\n",
    "                                             output=preds_prob_i,\n",
    "                                             labels_ph=y,\n",
    "                                             train=None,\n",
    "                                             loss=loss_i,\n",
    "                                             learning=phase,\n",
    "                                             sess=sess,\n",
    "                                             channels_first=False,\n",
    "                                             clip_values=(0, 1),\n",
    "                                             preprocessing=(0, 1),\n",
    "                                             feed_dict=feed_dict)\n",
    "\n",
    "    attack_eval_i = ProjectedGradientDescent(classifier=classifier_eval_i,\n",
    "                                             norm=np.inf,\n",
    "                                             eps=0.031,\n",
    "                                             eps_step=0.0078,\n",
    "                                             max_iter=100,\n",
    "                                             targeted=False,\n",
    "                                             num_random_init=1,\n",
    "                                             batch_size=batch_size)\n",
    "\n",
    "    X_test_adv_i = attack_eval_i.generate(X_test[:nb_samples], Y_test[:nb_samples])\n",
    "    \n",
    "    accuracy_test_adv_i = get_accuracy(X=X_test_adv_i, Y=Y_test, batch_size=batch_size,\n",
    "                                       predictions=preds_prob_i)\n",
    "    print('Model {0} - Accuracy on adversarial test examples: {1:.2f}%.'.format(i_pred + 1, \n",
    "                                                                               accuracy_test_adv_i * 100))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
